[dagster] solidの中で使われる特定の値をパラメータ化する
大阪オフィスの玉井です。
dagsterは、パイプラインの実行時に指定した値を使用して、データ処理を流すことができます。
やってみた
環境
- macOS Big Sur 11.3.1
- dagster 0.10.9
パラメータ化する
下記のような処理で考えてみます。
@solid def download_csv(context): df = pd.read_csv("https://www.police.pref.nara.jp/cmsfiles/contents/0000004/4634/nara_2020zidouhanbaikinerai.csv", encoding="shift_jis") return df
ファイルのダウンロード先のURL自体がハードコーディングされています。これでも問題なく動きますが、場合によっては、URL自体が変わることも考えられます。その度にコードを修正するのもアレですので、あまりURLベタ書きは望ましく有りません。
dagsterの場合、下記のように書くことができます。
@solid(config_schema={"url": str}) def download_csv(context): df = pd.read_csv(context.solid_config["url"], encoding="shift_jis") return df
こう記述することで、url
という変数に格納されるURLを、ファイルのダウンロード先として参照するようにできます。また、@solid
の横のconfig_schema
という記述で、指定する値の型を指定することができます(今回はURLを想定しているので文字列を期待する)。
この変数に実際に入れる値はどうするのか?というのは、dagitのUI上で指定します。
パラメータを指定してパイプラインを動かす
今回は下記のパイプラインを動かしてみます。
import pandas as pd from dagster import execute_pipeline, pipeline, solid @solid(config_schema={"url": str}) def download_csv(context): df = pd.read_csv(context.solid_config["url"], encoding="shift_jis") return df @solid def count_municipal(context, df): result = df['市区町村(発生地)'].value_counts().index[0] context.log.info(f'このデータ上で一番犯罪が多かった奈良県の市区町村は{result}です') @pipeline def configurable_pipeline(): count_municipal(download_csv())
dagitを実行します。
dagit -f donwload_nara.py
playrgroundというタブを開きます。
ここに、url
という変数に入れる値を設定します。設定の仕方は、下記のように、yaml形式で記述します。
solids: download_csv: config: url: "https://www.police.pref.nara.jp/cmsfiles/contents/0000004/4634/nara_2020zidouhanbaikinerai.csv
パイプラインを実行します。ちゃんと、出力が出てきましたね。
playrgroundタブのConfigで、ファイルのダウンロードURLを別のものに変えれば、当然、出力も変わります(出力メッセージを変えています)。
おわりに
この機能を使うことで、より色々な人が汎用的に使えるパイプラインを組むことが出来るようになります。